package dao

import java.util
import java.util.concurrent.{Executors, ExecutorService}

import org.apache.hadoop.hbase.client.Put
import org.apache.spark.mllib.recommendation.Rating

/**
  * Created by chenjianwen on 2016/3/11.
  */
class WriteResultToHbase(dao:ResultHBaseDao) extends extension.Extension{

  val threadLocal = new ThreadLocal[util.ArrayList[Put]]()

  def createTable(name:String,familys:List[String]): Unit ={
    dao.createTableWithOverWrite(name,familys)
  }

  /**
    *写入数据到队列
    *
    * @param datas List[(userId,rating)]
    */
  @Deprecated
  def insertUserRecommendDatasToTemp(datas:(Int,Array[Rating])):Unit={

    var allPut = new util.ArrayList[Put]()
    if((allPut = threadLocal.get())==null){
        threadLocal.set(allPut)
    }

    datas._2.map(x=>{(x.user,x.product,x.rating)}).foreach({
      var i:Int = 0
       _ match {
         case (user,product,score) =>{
             println(i)
             i = i+1
             val put = new Put(user.toString.getBytes)
             put.add("productsocre".getBytes,s"productAndScore#${i}".getBytes,s"${product}#${score}".getBytes)
             allPut.add(put)

             //dao.insertRow(user.toString,"productsocre",s"productandproduct##${i}","user_product_recommend",s"${product}#${score}")
         }
       }
    }
     )

    //dao.insertRows(allPut,"user_product_recommend")
  }

  implicit def listToList(pust:List[Put]):util.List[Put]={
    val list = new util.ArrayList[Put]()
    pust.foreach(x=>{
      list.add(x)
    })
    list
  }

  def insertAllRows(rows:List[Rating]):Unit={
    val allPut = rows.map(x=>{
      val put = new Put(x.user.toString.getBytes)
      put.add("productsocre".getBytes,s"productAndScore_${x.product}".getBytes,s"${x.product}#${x.rating}".getBytes)
    })
    dao.insertRows(allPut,"user_product_recommend")
  }


}
